package events

import (
	"context"
	"github.com/IBM/sarama"
	"jk-time/webook/feed/domain"
	"jk-time/webook/feed/service"
	"jk-time/webook/pkg/logger"
	"jk-time/webook/pkg/saramax"
	"strconv"
	"time"
)

type ArtilceEvent struct {
	Uid int64
	// 文章 id
	Aid int64
}

func (ArtilceEvent) Topic() string {
	return "article_feed"
}

type ArticleEventConsumer struct {
	client sarama.Client
	l      logger.Logger
	svc    service.FeedService
}

func NewArticleEventConsumer(client sarama.Client, l logger.Logger, svc service.FeedService) *ArticleEventConsumer {
	return &ArticleEventConsumer{client: client, l: l, svc: svc}
}
func (k *ArticleEventConsumer) Start() error {
	cg, err := sarama.NewConsumerGroupFromClient("article_feed", k.client)
	if err != nil {
		return err
	}
	go func() {
		err := cg.Consume(context.Background(),
			[]string{ArtilceEvent{}.Topic()},
			saramax.NewHandler[ArtilceEvent]("article_feed", k.Consume))
		if err != nil {
			k.l.Error("退出了消费循环,异常", logger.Error(err))
		}
	}()
	return err
}
func (k *ArticleEventConsumer) StartBatch() error {
	return nil
}
func (k *ArticleEventConsumer) Consume(msg *sarama.ConsumerMessage, t ArtilceEvent) error {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	k.l.Info("kafka:", logger.String("topic", t.Topic()),
		logger.String("value", string(msg.Value)))

	return k.svc.CreateFeedEvent(ctx, domain.FeedEvent{
		Type: service.ArticleEventName,
		Ext: map[string]string{
			"uid": strconv.FormatInt(t.Uid, 10),
			"aid": strconv.FormatInt(t.Aid, 10),
		},
	})
}
